home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.5)
-
- '''Glue code to handle BitTorrent stuff. Most of this comes from download.py
- in the BitTorrent library.
- '''
- from urlparse import urljoin
- from binascii import b2a_hex
- from sha import sha
- from os import path, makedirs
- from socket import error as socketerror
- from random import seed
- from threading import Thread, Event, Lock
- from time import time
- from Queue import Queue
-
- try:
- from os import getpid
- except ImportError:
-
- def getpid():
- return 1
-
-
- from BitTorrent.bitfield import Bitfield
- from BitTorrent.btformats import check_message
- from BitTorrent.Choker import Choker
- from BitTorrent.Storage import Storage
- from BitTorrent.StorageWrapper import StorageWrapper
- from BitTorrent.Uploader import Upload
- from BitTorrent.Downloader import Downloader
- from BitTorrent.Connecter import Connecter
- from BitTorrent.Encrypter import Encoder
- from BitTorrent.RawServer import RawServer
- from BitTorrent.Rerequester import Rerequester
- from BitTorrent.DownloaderFeedback import DownloaderFeedback
- from BitTorrent.RateMeasure import RateMeasure
- from BitTorrent.CurrentRateMeasure import Measure
- from BitTorrent.PiecePicker import PiecePicker
- from BitTorrent.bencode import bencode, bdecode
- from BitTorrent.download import defaults
- from BitTorrent import version
- from natpunch import UPnP_test, UPnP_open_port, UPnP_close_port
- import util
- import config as dtv_config
- import prefs
- config = { }
- for key, default, description in defaults:
- config[key] = default
-
- config['report_hash_failures'] = True
- storage_lock = Lock()
- upnp_type = UPnP_test(1)
- downloader_count = util.ThreadSafeCounter()
-
- def calc_max_upload_rate():
- total_rate = int(dtv_config.get(prefs.UPSTREAM_LIMIT_IN_KBS) * 1024)
- downloaders = downloader_count.getvalue()
- if downloaders != 0:
- return total_rate / downloaders
- else:
- return 0
-
-
- class TorrentDownload:
-
- def __init__(self, torrent_data, download_to, fast_resume_data = None):
- """Create a new torrent. torrent_data is the contents of a torrent
- file/url. download_to is the file/directory to save the torrent to.
- fast_resume_data is data used to quickly restart the torrent, it's
- returned by the shutdown() method.
- """
- self.doneflag = Event()
- self.finflag = Event()
- self.torrent_data = torrent_data
- self.download_to = download_to
- self.fast_resume_data = fast_resume_data
- self.fast_resume_queue = Queue()
- self.rawserver = RawServer(self.doneflag, config['timeout_check_interval'], config['timeout'], errorfunc = self.on_error, maxconnects = config['max_allow_in'])
- self.thread = None
- self.current_status = { }
- self.status_callback = None
-
- self.time_est_func = lambda : 0
- self.last_up_total = self.last_down_total = 0
- self.last_activity = None
- self.rawserver_started = False
- self.minport = dtv_config.get(prefs.BT_MIN_PORT)
- self.maxport = dtv_config.get(prefs.BT_MAX_PORT)
-
-
- def start(self):
- '''Start downloading the torrent.'''
- self.thread = Thread(target = self.download_thread)
- filename = path.basename(self.download_to)
- self.thread.setName('BitTorrent Downloader - %s' % util.stringify(filename, 'replace'))
- self.thread.start()
-
-
- def shutdown(self):
- '''Stop downloading the torrent.
-
- Returns a string that can be used as fast resume data.
- '''
- self.doneflag.set()
- self.rawserver.wakeup()
- if self.rawserver_started:
-
- try:
- return self.fast_resume_queue.get(timeout = 10)
- except Queue.Empty:
- return None
- except:
- None<EXCEPTION MATCH>Queue.Empty
-
-
- None<EXCEPTION MATCH>Queue.Empty
- return self.fast_resume_data
-
-
- def parse_fast_resume_data(self, total_pieces):
- already_got = None
- mtimes = { }
- if self.fast_resume_data is not None:
-
- try:
- fast_resume = bdecode(self.fast_resume_data)
- already_got = fast_resume['already_got']
- mtimes = fast_resume['mtimes']
- import traceback
- print 'WARNING: ERROR parsing fast resume data'
- traceback.print_exc(1)
- self.fast_resume_data = None
-
-
-
- try:
- self.pieces_already_got = Bitfield(total_pieces, already_got)
- except:
- print 'Failed to load resume data'
- self.pieces_already_got = Bitfield(total_pieces, None)
-
- self.fast_resume_mtimes = mtimes
-
-
- def skip_hash_check(self, index, files):
- if not self.pieces_already_got[index]:
- return False
-
- for f in files:
- mtimes_key = f.encode('utf-8')
- if path.getmtime(f) > self.fast_resume_mtimes.get(mtimes_key, 0):
- return False
- continue
-
- return True
-
-
- def set_status_callback(self, func):
- """Register a callback function. func will be called whenever the
- torrent download status changes and periodically while the torrent
- downloads. It will be passed a dict with the following attributes:
-
- activity -- string specifying what's currently happening or None for
- normal operations.
- upRate -- upload rate in B/s
- downRate -- download rate in B/s
- upTotal -- total MB uploaded (this run)
- downTotal -- total MB downloaded (this run)
- fractionDone -- what portion of the download is completed.
- timeEst -- estimated completion time, in seconds.
- totalSize -- total size of the torrent in bytes
- """
- self.status_callback = func
-
-
- def on_error(self, message):
- print 'WARNING BitTorrent error: ', message
-
-
- def on_status(self, status_dict):
- status = {
- 'upRate': status_dict.get('upRate', 0),
- 'downRate': status_dict.get('downRate', 0),
- 'upTotal': status_dict.get('upTotal', self.last_up_total),
- 'downTotal': status_dict.get('downTotal', self.last_down_total),
- 'timeEst': self.time_est_func(),
- 'totalSize': self.total_size }
- if status['timeEst'] is None:
- status['timeEst'] = 0
-
- if self.finflag.isSet():
- status['fractionDone'] = 1
- else:
- status['fractionDone'] = status_dict.get('fractionDone', 0)
- if status['downRate'] > 0 or status['upRate'] > 0:
- status['activity'] = None
- else:
- status['activity'] = status_dict.get('activity', self.last_activity)
- self.last_up_total = status['upTotal']
- self.last_down_total = status['downTotal']
- self.last_activity = status['activity']
- self.status_callback(status)
-
-
- def update_max_upload_rate(self):
- current_rate = calc_max_upload_rate()
- if current_rate != self.max_upload_rate:
- self.connecter.change_max_upload_rate(current_rate)
- self.max_upload_rate = current_rate
-
- self.rawserver.add_task(self.update_max_upload_rate, 5)
-
-
- def filefunc(self, file, length, saveas, isdir):
- self.total_size = length
- return self.download_to
-
-
- def download_thread(self):
- downloader_count.inc()
-
- try:
- self.download()
- finally:
- downloader_count.dec()
-
-
-
- def download(self):
- spewflag = Event()
-
- try:
- response = bdecode(self.torrent_data)
- check_message(response)
- except ValueError:
- e = None
- self.on_error('got bad file info - ' + str(e))
- return None
-
-
- try:
-
- def make(f, forcedir = False):
- if not forcedir:
- f = path.split(f)[0]
-
- if f != '' and not path.exists(f):
- makedirs(f)
-
-
- info = response['info']
- if info.has_key('length'):
- file_length = info['length']
- file = self.filefunc(info['name'], file_length, config['saveas'], False)
- if file is None:
- return None
-
- make(file)
- files = [
- (file, file_length)]
- else:
- file_length = 0
- for x in info['files']:
- file_length += x['length']
-
- file = self.filefunc(info['name'], file_length, config['saveas'], True)
- if file is None:
- return None
-
- make(file, True)
- files = []
- for x in info['files']:
- n = file
- for i in x['path']:
- n = path.join(n, i)
-
- files.append((n, x['length']))
- make(n)
- except OSError:
- e = None
- self.on_error("Couldn't allocate dir - " + str(e))
- return None
-
-
- finflag = self.finflag
- ann = [
- None]
- myid = 'M' + version.replace('.', '-')
- myid = myid + '-' * (8 - len(myid)) + b2a_hex(sha(repr(time()) + ' ' + str(getpid())).digest()[-6:])
- seed(myid)
- pieces = [ info['pieces'][x:x + 20] for x in xrange(0, len(info['pieces']), 20) ]
- self.parse_fast_resume_data(len(pieces))
-
- def failed(reason):
- self.doneflag.set()
- if reason is not None:
- self.on_error(reason)
-
-
- rawserver = self.rawserver
- storage_lock.acquire()
-
- try:
-
- try:
- storage = Storage(files, open, path.exists, path.getsize)
- except IOError:
- ([],)
- e = ([],)
- []
- self.on_error('trouble accessing files - ' + str(e))
- return None
- except:
- ([],)
-
-
- def finished(finflag = finflag, ann = ann, storage = (storage,)):
- finflag.set()
-
- try:
- storage.set_readonly()
- except (IOError, OSError):
- e = None
- self.on_error('trouble setting readonly at end - ' + str(e))
-
- if ann[0] is not None:
- ann[0](1)
-
-
- rm = [
- None]
-
- def data_flunked(amount, rm = rm, report_hash_failures = (config['report_hash_failures'],)):
- if rm[0] is not None:
- rm[0](amount)
-
- if report_hash_failures:
- self.on_error('a piece failed hash check, re-downloading it')
-
-
- storagewrapper = StorageWrapper(storage, config['download_slice_size'], pieces, info['piece length'], finished, failed, self.on_status, self.doneflag, config['check_hashes'], data_flunked, self.skip_hash_check)
- except ValueError:
- ([],)
- e = ([],)
- []
- failed('bad data - ' + str(e))
- except IOError:
- []
- e = []
- failed('IOError - ' + str(e))
- except:
- ([],)
- finally:
- storage_lock.release()
-
- e = 'maxport less than minport - no ports to check'
- for listen_port in xrange(self.minport, self.maxport + 1):
-
- try:
- rawserver.bind(listen_port, config['bind'])
- continue
- except socketerror:
- None if self.doneflag.isSet() else []
- e = None if self.doneflag.isSet() else []
- continue
-
-
- else:
- return None
- choker = Choker(config['max_uploads'], rawserver.add_task, finflag.isSet, config['min_uploads'])
- upmeasure = Measure(config['max_rate_period'], config['upload_rate_fudge'])
- downmeasure = Measure(config['max_rate_period'])
-
- def make_upload(connection, choker = choker, storagewrapper = storagewrapper, max_slice_length = config['max_slice_length'], max_rate_period = config['max_rate_period'], fudge = config['upload_rate_fudge']):
- return Upload(connection, choker, storagewrapper, max_slice_length, max_rate_period, fudge)
-
- ratemeasure = RateMeasure(storagewrapper.get_amount_left())
- self.time_est_func = ratemeasure.get_time_left
- rm[0] = ratemeasure.data_rejected
- picker = PiecePicker(len(pieces), config['rarest_first_cutoff'])
- for i in xrange(len(pieces)):
- if storagewrapper.do_I_have(i):
- picker.complete(i)
- continue
- None if self.doneflag.isSet() else [] if upnp_type else None if self.doneflag.isSet() else []
-
- downloader = Downloader(storagewrapper, picker, config['request_backlog'], config['max_rate_period'], len(pieces), downmeasure, config['snub_time'], ratemeasure.data_came_in)
- self.max_upload_rate = calc_max_upload_rate()
- connecter = Connecter(make_upload, downloader, choker, len(pieces), upmeasure, self.max_upload_rate, rawserver.add_task)
- self.connecter = connecter
- infohash = sha(bencode(info)).digest()
- encoder = Encoder(connecter, rawserver, myid, config['max_message_length'], rawserver.add_task, config['keepalive_interval'], infohash, config['max_initiate'])
- rerequest = Rerequester(response['announce'], config['rerequest_interval'], rawserver.add_task, connecter.how_many_connections, config['min_peers'], encoder.start_connection, rawserver.add_task, storagewrapper.get_amount_left, upmeasure.get_total, downmeasure.get_total, listen_port, config['ip'], myid, infohash, config['http_timeout'], self.on_error, config['max_initiate'], self.doneflag, upmeasure.get_rate, downmeasure.get_rate, encoder.ever_got_incoming)
- if config['spew']:
- spewflag.set()
-
- DownloaderFeedback(choker, rawserver.add_task, self.on_status, upmeasure.get_rate, downmeasure.get_rate, upmeasure.get_total, downmeasure.get_total, ratemeasure.get_time_left, ratemeasure.get_size_left, file_length, finflag, config['display_interval'], spewflag)
- self.on_status({
- 'activity': 'connecting to peers' })
- ann[0] = rerequest.announce
- rerequest.begin()
- self.rawserver.add_task(self.update_max_upload_rate, 5)
- self.rawserver_started = True
-
- try:
- rawserver.listen_forever(encoder)
- finally:
-
- try:
- []['mtimes'] = []([ (f, long(path.getmtime(f))) for f, size in files ])
- fast_resume_data = dict
- self.fast_resume_queue.put(bencode(fast_resume_data))
- except:
- self.fast_resume_queue.put(None)
- raise
-
-
- storage.close()
- if upnp_active:
- UPnP_close_port(listen_port)
-
- rerequest.announce(2)
-
-
-